{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dagster\n",
    "\n",
    "from dagster_contrib.dagster_examples.pandas_hello_world.pipeline import define_pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sum_sq_pipeline = define_pipeline()\n",
    "sum_sq_pipeline\n",
    "\n",
    "from dagster.graphviz import build_graphviz_graph\n",
    "build_graphviz_graph(sum_sq_pipeline)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Below we'll demonstrate different ways of executing pipelines that may be useful in an interactive context:\n",
    "\n",
    "1) Execute a pipeline in memory upto a particiular solid in the pipeline. Current stupid name: `execute_solid_in_pipeline`\n",
    "\n",
    "2) Execute a pipeline in memory but able to execute each result individually. Another stupid name `execute_pipeline_and_collect`. Note: the more elegantly named `execute_pipeline` returns an iterator, but that is less friendly in a notebook environment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This executes a pipeline through the solid name provided ('sum_sq')\n",
    "# Note that it does *not* execute the \"always_fails\" solid because it is not\n",
    "# in the dependency chain of of \"sum_sq\"\n",
    "from dagster.core.execution import (ExecutionContext, execute_pipeline_through_solid)\n",
    "\n",
    "result = dagster.execute_pipeline_through_solid(\n",
    "    dagster.ExecutionContext.create(), \n",
    "    sum_sq_pipeline, \n",
    "    {'num_csv': {'path': 'num.csv'}},\n",
    "    'sum_sq'\n",
    ")\n",
    "result.result_value"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "# This will execute the two pipeline steps and return the two results corresponding to each step\n",
    "results = dagster.execute_pipeline(\n",
    "    dagster.ExecutionContext.create(), \n",
    "    sum_sq_pipeline, \n",
    "    {'num_csv': {'path': 'num.csv'}},\n",
    "    through_solids=['sum_sq'],\n",
    ")\n",
    "\n",
    "[result.name for result in results]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "result_dict = { result.name : result for result in results }\n",
    "# the first result is the intermediate result out of the 'sum' solid\n",
    "result_dict['sum'].result_value"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# the next result is the output of the sum_sq\n",
    "result_dict['sum_sq'].result_value"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's inject some errors in the pipeline by invoking the always_fails"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "# This will execute the two pipeline steps and return the two results corresponding to each step\n",
    "results = dagster.execute_pipeline(\n",
    "    dagster.ExecutionContext.create(), \n",
    "    sum_sq_pipeline, \n",
    "    {'num_csv': {'path': 'num.csv'}},\n",
    "    through_solids=['always_fails'],\n",
    ")\n",
    "\n",
    "result_dict = { result.name : result for result in results }\n",
    "result_dict"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "result_dict['sum'].success"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "result_dict['sum'].result_value"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "result_dict['always_fails'].success"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "result_dict['always_fails'].reason"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This method allows a result to reraise a user error with a clean callstack\n",
    "\n",
    "result_dict['always_fails'].reraise_user_error()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's actually output some stuff to a file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "output_path = '/tmp/notebook_sum_sq.csv'\n",
    "\n",
    "results = dagster.output_pipeline(\n",
    "    dagster.ExecutionContext.create(), \n",
    "    sum_sq_pipeline, \n",
    "    input_arg_dicts={'num_csv': {'path': 'num.csv'}},\n",
    "    output_arg_dicts={'sum_sq': {'CSV': {'path': output_path}}},\n",
    ")\n",
    "\n",
    "[result.name for result in results]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "pd.read_csv(output_path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "dagster",
   "language": "python",
   "name": "dagster"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
